/** Changed Gaurav
#ifdef PARALLEL
*/

/* parallel.c */

/* Author Wes Terpstra  */
/* Modifications and bug fixes by sness */

#include "clam.h"
#include "parallel.h"
#include "serial.h"

#include <pthread.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <unistd.h>
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <syslog.h>
#include <netdb.h>
#include <signal.h>
#include <pwd.h>

/* How this system works:
 *
 * We have worker threads - they are passed a clone id # and generate the
 * corresponding rows in the matrix.
 *
 * We have a localDispatcher which gives out clone #s to threads running on
 * the system.
 *
 * We have a daemonPusher which gets clone #s from the localDispatcher on
 * the client (the machine with the GUI).
 *
 * We have a slaveDispatcher which gets clone #s from the daemonPusher via
 * TCP/IP and then dispatches them to its threads.
 *
 * Finally we have a routine to locate daemon CPUs via broadcast UDP.
 */

/* We use signals to wakeup from blocking reads and to detect bad connections.
 * This is a pain b/c we need the signals to arrive at specific threads.
 * Presently signals SIGPIPE, SIGALRM, SIGTERM, SIGINT are the only ones used.
 * SIGPIPE is needed to tell us a connection went bad.
 * SIGALRM is needed to interrupt a blocking read & trigger keep alive pings.
 *
 * We want only the main thread to receive SIGINT & SIGTERM at all times.
 *
 * This lists what each component ever uses.
 *
 * FPC client machine:
 *   Find daemons sub:     Does pings, interrupted I/O.
 *   The local dispatcher: No networking
 *   The local workers:    No networking
 *   The daemon pusher:    Does keep alive pings, needs interrupted I/O, and
 *                         needs to manage the daemon connections.
 *
 * Hence: on the client, daemon pusher&finder get SIGPIPE & SIGALRM
 * The finder & localDispatcher are main thread: SIGTERM & SIGINT
 *
 * FPCd helper:
 *   The remote workers:   They do writes over TCP/IP.
 *						   This is bad news b/c we could sigpipe here.
 * 						   However, the dispatcher could get worker's sigpipe
 * 						   and then figure out that we need to quit.
 *   The slave dispatcher: Reads over TCP/IP, sends keep alive pings (but does
 *                         not use SIGALRM for this), uses interrupted I/O.
 *
 * Hence: on the daemon, workers need to IGNORE SIGPIPE.
 *        dispatcher: needs to receive SIGALRM (interrupted IO)
 *                    needs to receive SIGPIPE (talks over TCP/IP)
 *					  needs to receive SIGINT & SIGTERM
 *
 */

static pthread_mutex_t acquireOrdersMutex;

/* Used to have exactly one worker wait and let the dispatcher know */ 
static pthread_cond_t  workerWaitingCond;
static pthread_mutex_t workerWaitingCondMutex;

/* Used to receive orders from the dispatcher */
static int             ordersValue;
static int             previousStatus;
static pthread_cond_t  ordersCond;
static pthread_mutex_t ordersCondMutex;

/* Is the daemonPusherRunning = 1 yes, 0 = no */
static int daemonPusherRunning;
static time_t lastStatus;
static long long int    lastJobCount;
static long long int    lastCalcCount;
static long long int    calcCount;
long long int jobCount;

int testFlag = 1;

#ifndef NO_DAEMON_LOG
void initStats
()
{
	jobCount      = 0;
	calcCount     = 0;
	lastJobCount  = 0;
	lastCalcCount = 0;
	lastStatus    = time(0);
}

void writeStats()
{
    FILE*  logFile;
	time_t now;
	char   timeBuf[100];

	now = time(0);

  	/* Write stats to disk */
	if ((logFile = fopen("/var/log/fpcd/clones.log", "a")) != (FILE*)0)
    {
		/* flock(fileno(logFile), LOCK_EX); */
		lockf(fileno(logFile), F_LOCK, 0);
		strftime(timeBuf, sizeof(timeBuf), "%Y/%m/%d-%H:%M:%S", localtime(&now));
		fprintf(logFile, "%li %s %lli %lli\n", 
				/*				now, timeBuf, jobCount - lastJobCount, calcCount - lastCalcCount); */
				now, timeBuf, jobCount, calcCount );
		/* flock(fileno(logFile), LOCK_UN); */
		lockf(fileno(logFile), F_ULOCK, 0);
		fclose(logFile);
	}
	
	lastJobCount  = jobCount;
	lastCalcCount = calcCount;
	lastStatus    = now;
}

void closeStats()
{
    writeStats();
}
#endif

/* Do nothing signal handler -> we just want interrupting IO for sigpipe */
static int needPing;
static void pingHandler(int sig)
{
	needPing = 1;
	return;
}

/* Initialize all the mutexes we use in the parallel process */
int initMutexes()
{
	pthread_mutex_init(&acquireOrdersMutex, 0);
	pthread_mutex_init(&workerWaitingCondMutex, 0);
	pthread_mutex_init(&ordersCondMutex, 0);
	
	pthread_cond_init(&workerWaitingCond, 0);
	pthread_cond_init(&ordersCond, 0);

	/* No one must get orders till dispatcher starts */
	pthread_mutex_lock(&acquireOrdersMutex);

	return 0;
}

/* Release all the mutexes we were using in the parallel process */
int destroyMutexes()
{
	pthread_mutex_destroy(&acquireOrdersMutex);
	pthread_mutex_destroy(&workerWaitingCondMutex);
	pthread_mutex_destroy(&ordersCondMutex);
	
	pthread_cond_destroy(&workerWaitingCond);
	pthread_cond_destroy(&ordersCond);

	return 0;
}

/* This will get new orders for the worker and pass the status of the job
 * back to the dispatcher.
 */
int acquireWorkerOrders(int status)
{
	int    myOrders;
		
	/* Firstly, only one worker should be getting/returning orders from/to 
	 * the dispatcher at a time. Whoever holds this mutex is who will next
	 * talk to the dispatcher */
	pthread_mutex_lock(&acquireOrdersMutex);
	
	/* Now, we want to obtain orders from the dispatcher. He will tell us
	 * when the orders are ready, but we have to make sure he doesn't tell
	 * us the orders before we are listening. So, lock the condition mutex.
	 */
	pthread_mutex_lock(&ordersCondMutex);

	/* Set the status variable to our last status - dispatcher wants to know.
     */
	previousStatus = status;
	
	/* We now tell the dispatcher there is a worker waiting for orders.
	 * He can't tell us the orders until we are ready because he needs to
	 * lock the workerWaitingCondMutex.
	 */
	pthread_mutex_lock(&workerWaitingCondMutex);
	pthread_cond_signal(&workerWaitingCond);
	pthread_mutex_unlock(&workerWaitingCondMutex);
	
	/* We can now start waiting for orders from the dispatcher. This
	 * unlocks the mutex once we're waiting. Then orders can come through.
	 */
	pthread_cond_wait(&ordersCond, &ordersCondMutex);
	
	/* Orders have arrived for us! Let's get them. */
	myOrders = ordersValue;

	/* Record stats on clone throughput every minute */
	jobCount++;
	calcCount += ZZ.size - myOrders; /* We did this many compares */
#ifndef NO_DAEMON_LOG
	if (time(0) >= lastStatus + 60) writeStats();
#endif
	
	/* Good, now lets release the condition mutex since we're not interested
	 * anymore.
	 */
	pthread_mutex_unlock(&ordersCondMutex);
	
	/* We're done getting orders, let other workers into this function. */
	pthread_mutex_unlock(&acquireOrdersMutex);
	
	return myOrders;
}

/* This will push orders back onto the dispatcher's queue.
 * Note: This only works with a localDispatcher at present.
 * I would like to change slaveDispatcher over to the queue system, but right
 * now it is unneccesary and I have limited time.
 */
int pushBackOrders(int orders)
{
	/* Firstly, only one caller should be getting/returning orders from/to
	 * the dispatcher at a time. Whoever holds this mutex is who will next
	 * talk to the dispatcher */
	pthread_mutex_lock(&acquireOrdersMutex);
	
	/* Now, we want to give orders to the dispatcher. He will tell us
	 * when he has gotten the orders, but we have to make sure he doesn't tell
	 * us he's done before we are listening. So, lock the condition mutex.
	 */
	pthread_mutex_lock(&ordersCondMutex);

	/* Set the status variable to say we're returing an order
     */
	previousStatus = STATUS_RETURNING_ORDER;
	ordersValue = orders;

	/* We now tell the dispatcher there are orders waiting for him.
	 * He can't tell us he's done until we are ready because he needs to
	 * lock the workerWaitingCondMutex.
	 */
	pthread_mutex_lock(&workerWaitingCondMutex);
	pthread_cond_signal(&workerWaitingCond);
	pthread_mutex_unlock(&workerWaitingCondMutex);
	
	/* We can now start waiting for the ok from the dispatcher. This
	 * unlocks the mutex once we're waiting. Then orders can come through.
	 */
	pthread_cond_wait(&ordersCond, &ordersCondMutex);
	
	jobCount--; /* count jobs dispatched for stats */
	calcCount -= ZZ.size - orders;
	
	/* Good, now lets release the condition mutex since we're not interested
	 * anymore.
	 */
	pthread_mutex_unlock(&ordersCondMutex);
	
	/* We're done returning orders, let other workers into this function. */
	pthread_mutex_unlock(&acquireOrdersMutex);
	
	return 0;
}

/* This worker just pushes stuff into his matrix and gives the dispatcher
 * his status replies.
 */
void* localWorkerProcedure(void* arg)
{
	int orders, done;

	/* We are supposed to ignore signals SIGPIPE & SIGALRM 
	 * We are supposed to ignore signals SIGINT & SIGTERM
	 */
	sigset_t set;
	sigemptyset(&set);
	sigaddset(&set, SIGALRM);
	sigaddset(&set, SIGPIPE);
	sigaddset(&set, SIGINT);
	sigaddset(&set, SIGTERM);
	pthread_sigmask(SIG_BLOCK, &set, 0);
	
	done = 0;

	while (1)
	{
		orders = acquireWorkerOrders(STATUS_OK);
		
		if (orders == -1) return (void*)done;
	
		/* Create the clone matrix row that our orders specify */
		if (!Zcreate_node_parallel(orders))
		{
			/* An error occured, consume all tasks and return 
			 * the error so that we exit cleanly
			 */
			while (orders != -1)
			{ 
				orders = acquireWorkerOrders(STATUS_FAIL_ALGORITHM);
			}
			
			done = STATUS_FAIL_ALGORITHM;
			
			return (void*)done;
		}
		
		done++;
	}
}

const char* addressToName(struct sockaddr_in addr)
{
  struct hostent* hent;

  if (!(hent = gethostbyaddr((const char*)&addr.sin_addr, 
							 sizeof(addr.sin_addr), AF_INET)))
  {
	  return inet_ntoa(addr.sin_addr);
  }

  return hent->h_name;
}

/* Our job is to look for client processors to do our bidding */
/* We return the number of daemons and put the file handles in dynamic memory
 * that we assign to the parameter. -1 on error
 */
int findDaemons(int** daemonOut, struct sockaddr_in** addrOut)
{
	int                 probe;
	struct sockaddr_in  probe_sin;
	int                 probe_len;

	int                 acceptor;
	struct sockaddr_in  acceptor_sin;
	int                 acceptor_len;

	int                 daemon;
	struct sockaddr_in  daemon_sin;
	int                 daemon_len;
	
	u_int32_t	        magic;
	u_int32_t           packet[2];
	int                 counter;
	
	int*                temp_daemons;
	struct sockaddr_in* temp_daemon_sins;

	int*                daemons;
	struct sockaddr_in* daemon_sins;
	int                 daemonSize;
	int                 daemonCount;

	struct sigaction ping_signal;
	struct sigaction old_sigpipe; 
	
	char buf[120];
	
	int one = 1;
	
	sigset_t set;

	/* We are supposed to receive signals SIGPIPE & SIGALRM */
	/* We are supposed to get signals SIGTERM & SIGINT (we're main thread) */
    /* At least until we're done here */
	sigemptyset(&set);
	sigaddset(&set, SIGALRM);
	sigaddset(&set, SIGPIPE);
	sigaddset(&set, SIGTERM);
	sigaddset(&set, SIGINT);
	pthread_sigmask(SIG_UNBLOCK, &set, 0);

	daemonSize = 10;
	daemons = (int*)malloc(sizeof(int) * daemonSize);
	daemon_sins = (struct sockaddr_in*)malloc(sizeof(struct sockaddr_in) *
											  daemonSize);
	daemonCount = 0;
	
	if (!daemons || !daemon_sins)
	{
	    if (daemons    ) free(daemons);
		if (daemon_sins) free(daemon_sins);
		perror("Insufficient memory for daemon buffer.");
		return -1;
	}
	
	if ((acceptor = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
	{
		perror("Getting a TCP socket");
		free(daemons);
		free(daemon_sins);
		return -1;
	}
	
	acceptor_sin.sin_family      = AF_INET;
	acceptor_sin.sin_addr.s_addr = INADDR_ANY;
	acceptor_sin.sin_port        = htons(0); /* dynamic address allocation */
	
	if (bind(acceptor, (struct sockaddr*)&acceptor_sin, sizeof(acceptor_sin)) < 0)
	{
		perror("Binding acceptor to tcp port");
		free(daemons);
		free(daemon_sins);
		close(acceptor);
		return -1;
	}
	
	/* Acquire our port name (the dynamic port) */
	acceptor_len = sizeof(acceptor_sin);
	getsockname(acceptor, (struct sockaddr*)&acceptor_sin, &acceptor_len);
	
	/* We need a huge queue b/c the clients will all try to connect at the
	 * same time and we don't want to lose any requests to help us. :-)
	 */
	if (listen(acceptor, 50) < 0)
	{
		perror("Listening on acceptor tcp port");
		free(daemons);
		free(daemon_sins);
		close(acceptor);
		return -1;
	}
	
	if ((probe = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
	{
		perror("Getting a UDP socket");
		free(daemons);
		free(daemon_sins);
		close(acceptor);
		return -1;
	}
	
	probe_sin.sin_family      = AF_INET;
	probe_sin.sin_addr.s_addr = INADDR_ANY;
	probe_sin.sin_port        = htons(0); /* dynamic address allocation */
	
	if (bind(probe, (struct sockaddr*)&probe_sin, sizeof(probe_sin)) < 0)
	{
		perror("Binding to probe udp port");
		free(daemons);
		free(daemon_sins);
		close(acceptor);
		close(probe);
		return -1;
	}
	
	/* Acquire our port name (the dynamic port) */
	probe_len = sizeof(probe_sin);
	getsockname(probe, (struct sockaddr*)&probe_sin, &probe_len);	
	
	/* Allow our probe port to do broadcasts */
	setsockopt(probe, SOL_SOCKET, SO_BROADCAST, &one, sizeof(one));
	
	/* Make the acceptor port non-blocking */
	fcntl(acceptor, F_SETFL, fcntl(acceptor, F_GETFL, 0) | O_NONBLOCK);
	
	printf("  - Dynamic ports acquired: %u/%u\n", 
		   ntohs(probe_sin.sin_port),
		   ntohs(acceptor_sin.sin_port));
	
	/* Setup a handler for our sigpipe that triggers an EIO */
	ping_signal.sa_handler = &pingHandler;
	sigemptyset(&ping_signal.sa_mask);
	ping_signal.sa_flags = 0; /* note: no SA_RESTART */
   	sigaction(SIGPIPE, &ping_signal, &old_sigpipe);

	sprintf(buf, "\r  - Probing network, found: ");
	
	counter = 8;
	while (counter)
	{
		printf("%s", buf);
		fflush(stdout);
		
		daemon_sin.sin_family      = AF_INET;
		if (strlen(clientIP))
		  daemon_sin.sin_addr.s_addr = inet_addr(clientIP);
                else
		  daemon_sin.sin_addr.s_addr = INADDR_BROADCAST;
		daemon_sin.sin_port        = htons(DAEMON_PORT);
	
		packet[0] = htonl(MAGIC);
		packet[1] = htonl(ntohs(acceptor_sin.sin_port));

		if (sendto(probe, &packet, sizeof(packet[0])*2, 0, 
			(struct sockaddr*)&daemon_sin, sizeof(daemon_sin)) 
			!= sizeof(packet[0])*2)
		{
			perror("sendto");
			close(probe);
			close(acceptor);
			for (counter = 0; counter < daemonCount; counter++)
			  close(daemons[counter]);
			free(daemons);
			free(daemon_sins);
			sigaction(SIGPIPE, &old_sigpipe, 0);
			return -1;
		}
		
		/* Wait 10us for a reply */
		usleep(10000);
		
		daemon_len = sizeof(daemon_sin);
		if ((daemon = accept(acceptor, &daemon_sin, &daemon_len)) != -1)
		{
		    /* Put new connection in blocking write (just in case) */
			fcntl(daemon, F_SETFL, fcntl(daemon, F_GETFL, 0) & ~O_NONBLOCK);

		    /* Send the magic number to get them to recognize us */
			magic = htonl(MAGIC);
			if (write(daemon, &magic, sizeof(magic)) != sizeof(magic))
			{
			    printf("\n  - %s dropped connection after connect.\n", 
					   addressToName(daemon_sin));
 			    close(daemon);
				counter--;
			}
			else
			{
			    const char* addrName = addressToName(daemon_sin);
			
				if (strlen(addrName) + strlen(buf) > 78)
			    {
					printf("\n");
					sprintf(buf, "\r      %s ", addrName);
				}
				else
				{
					strcat(buf, addrName);
					strcat(buf, " ");
				}
			
				printf("%s", buf);
				fflush(stdout);
			
				if (daemonCount == daemonSize)
				{
					daemonSize *= 2;
					temp_daemons = (int*)realloc(
                          daemons, sizeof(int)*daemonSize);
					temp_daemon_sins = (struct sockaddr_in*)realloc(
						  daemon_sins, sizeof(struct sockaddr_in)*daemonSize);

					if (!temp_daemons || !temp_daemon_sins)
					{
						perror("Insufficient memory for daemon buffer\n");

						if (temp_daemon_sins) daemon_sins = temp_daemon_sins;
						if (temp_daemons    ) daemons     = temp_daemons;
						
						close(probe);
						close(acceptor);
						for (counter = 0; counter < daemonCount; counter++)
						  close(daemons[counter]);

						free(daemons);
						free(daemon_sins);

						sigaction(SIGPIPE, &old_sigpipe, 0);
						return -1;
					}
					else
				    {
						daemons     = temp_daemons;
						daemon_sins = temp_daemon_sins;
					}
				}
				
				daemons    [daemonCount] = daemon;
				daemon_sins[daemonCount] = daemon_sin;
				daemonCount++;
			
				counter = 8;
				printf("%s", buf);
			}
		}
		else
		{
			counter--;
		}
	}

	if (close(acceptor) == -1) perror("closing acceptor");
	if (close(probe)    == -1) perror("closing probe");
	
	printf(".\n");

	fflush(stdout);

	/* */
	/* sness - Temporary testing */
	/* */
	/*test_write("/home/sness/write-client"); */
	
	for (counter = 0; counter < daemonCount; counter++)
    {
		/* Transmit our data */	
                printf("\r  - Transmitting data: %i/%i", counter, daemonCount);
		fflush(stdout);
		
		if (writePz             (&Pz,   daemons[counter]) == -1 ||
		    writeProj           (&Proj, daemons[counter]) == -1 ||
		    writeCpM            (&Cp,   daemons[counter]) == -1 ||
		    writeAcedata        (       daemons[counter]) == -1 ||
		    writeMatrix         (       daemons[counter]) == -1 ||
		    writeMarkerdata     (       daemons[counter]) == -1 ||
		    writeDistrib        (       daemons[counter]) == -1 ||
		    writeLoadCzFastCache(       daemons[counter]) == -1 
		    
		    )


	    {
			printf("\r  - Lost a node during upload: %-40s\n", 
					   addressToName(daemon_sins[counter]));
			
			close(daemons[counter]);

			daemonCount--;
			daemons    [counter] = daemons    [daemonCount];
			daemon_sins[counter] = daemon_sins[daemonCount];

			counter--; /* balance the increment */
		}
		else
	    {
	    
		    /* Successful transmission */
		    
	      /* Make the socket non-blocking */
			fcntl(daemon, F_SETFL, fcntl(daemon, F_GETFL, 0) | O_NONBLOCK);
		}
	}
	printf("\r  - Done trasmitting data.     \n");
	/* Restore old sig pipe handler */
	sigaction(SIGPIPE, &old_sigpipe, 0);

	/* Decrease consumed memory and set return value */
	*daemonOut = daemons = (int*)realloc(daemons, sizeof(int) * daemonCount);
	*addrOut   = daemon_sins = (struct sockaddr_in*)realloc(daemon_sins,
									sizeof(struct sockaddr_in) * daemonCount);

	return daemonCount;
}

/* These queueing routines are used to support the localDispatcher & 
 * daemonPusher. It is my hope that some day the slaveDispatcher's ugly
 * wrap around queue be rewritten to use this, but I doubt I will be here
 * long enough to finish the myself. :-(
 *
 * Notice that we are not locking the matrix rows. This is b/c at most one
 * queue ever has each row and the queues are always held by only one thread.
 */

static void enqueue(int* queueHead, int clone)
{
    ZZ.matrix[clone].nextQueued = *queueHead;
	*queueHead = clone;
}

static int extract(int* queueHead)
{
    int clone = *queueHead;

    if (clone == -1) return -1;
	
	*queueHead = ZZ.matrix[clone].nextQueued;
	ZZ.matrix[clone].nextQueued = -1;

	return clone;
}

static int purge(int* queueHead, int clone)
{
    int* search;

	for (search = queueHead; 
		 *search != clone; 
		 search = &ZZ.matrix[*search].nextQueued)
	{
		if (*search == -1) return -1;
	}

	/* We now have a ptr to the clone index we want to remove */

	/* Remove it */
	*search = ZZ.matrix[clone].nextQueued;
	ZZ.matrix[clone].nextQueued = -1;

	return 0;
}

/* Useful to quickly kill off all our threads. */
void mergeThreads(int numberThreads)
{
	int thread;
	
	for (thread = 0; thread < numberThreads; thread++)
    {
		/* Wait till someone asks for orders */
		pthread_cond_wait(&workerWaitingCond, &workerWaitingCondMutex);
		
		/* Dispatch stop order */
		ordersValue = -1;
		
		/* Tell the worker its orders are ready */
		pthread_mutex_lock(&ordersCondMutex);
		pthread_cond_signal(&ordersCond);
		pthread_mutex_unlock(&ordersCondMutex);
	}
}    

/* We dispatch clones to local worker threads. This dispatcher is used by fpc
 * not by fpcd.
 */
int localDispatcherProcedure(int numberThreads, int queueHead)
{
	int orders;
	int totalOrders = ZZ.size;
	
	float percentStatus = 0.01;
	float percentApprox = percentStatus;
	int   nextStatus;
	int   threadsAlive;
	int   myHead = queueHead;
	
	int error;

	sigset_t set;

	/* We are supposed to ignore signals SIGPIPE & SIGALRM */
	sigemptyset(&set);
	sigaddset(&set, SIGALRM);
	sigaddset(&set, SIGPIPE);
	pthread_sigmask(SIG_BLOCK, &set, 0);
	/* We are supposed to receive signals SIGINT & SIGTERM */
	sigemptyset(&set);
	sigaddset(&set, SIGTERM);
	sigaddset(&set, SIGINT);
	pthread_sigmask(SIG_UNBLOCK, &set, 0);

	/* How many threads ask us questions? */
	threadsAlive = numberThreads;
	if (daemonPusherRunning) threadsAlive++;

	/* Prepare statistics counter */
#ifndef NO_DAEMON_LOG
	initStats();
#endif	
	/* No errors yet */
	error = STATUS_OK;
	
	/* Compute next status update */
	nextStatus = totalOrders * (1.0 - sqrt(1.0 - percentApprox));
	
	/* Once we have this lock, no signals about new workers waiting will
	 * slip by us.
	 */
	pthread_mutex_lock(&workerWaitingCondMutex);

	/* Let people ask for orders now */
	pthread_mutex_unlock(&acquireOrdersMutex);
	
	printf("  - No clones dispatched: 0.00%% complete");
	fflush(stdout);
	
	while (threadsAlive > 0)
	{
		/* Wait till someone asks for orders */
		pthread_cond_wait(&workerWaitingCond, &workerWaitingCondMutex);
		
		/* If have an error, our orders are to stop */
		if (error != STATUS_OK)
		{
			orders = -1;
		}
		else
		{
		    orders = extract(&myHead);
		}

		/* Determine behaviour based on client status */
		switch (previousStatus)
		{
		  case STATUS_OK:
			/* Dispatch the order */
			ordersValue = orders;
			/* If we told someone to stop, decrement the alive counter */
			if (ordersValue == -1) threadsAlive--;
			break;
			
		  case STATUS_RETURNING_ORDER:
			/* We don't need the order we extracted, enqueue it */
			if (orders != -1) enqueue(&myHead, orders);
			/* Restore the order that we're being given */
			enqueue(&myHead, ordersValue);
			break;

		  case STATUS_FAIL_NETWORK:
			/* This person was crashing; don't give them a clone. */
			printf("Local dispatcher watched a worker die. Recovered.\n");
			if (orders != -1) enqueue(&myHead, orders);
			threadsAlive--;
			ordersValue = -1;
		  
		  /* STATUS_FAIL_ALGORITHM */
		  default:
			printf("Local dispatcher watched an algorithmic error occure. Cannor recover.\n");
			error = STATUS_FAIL_ALGORITHM;
			threadsAlive--;
			ordersValue = -1;
		}
		
		/* Tell the worker its orders are ready */
		pthread_mutex_lock(&ordersCondMutex);
		pthread_cond_signal(&ordersCond);
		pthread_mutex_unlock(&ordersCondMutex);
		
		/* Check if we should update status for user */
		/* Note: This is an approximation b/c:
		 *   -> It assumes cost is exactly n^2.
		 *   -> It considers orders sent to daemons as completed.
		 *      (so any buffered clones are counted as done)
		 *   -> Any orders that have been returned are not counted.
		 *   -> For very small data sets (< 100 clones), the percentage
		 *      bar can't move fast enough, but who can see that? :-)
		 * This means it has an initial burst, and a delay at 100%
		 * plus has some random noise.
		 *
		 * For a user to watch, however, it is pretty close.
		 * I timed it and never noticed an difference in % ticks of more
		 * than 0.5s (I used a stop watch. :-)
		 */
		if (orders >= nextStatus)
		{
			percentApprox += percentStatus;
			nextStatus = totalOrders * (1.0 - sqrt(1.0 - percentApprox));
			
			printf("\r  - Clones up to #%i have been dispatched: %02.0f%% complete", 
				orders, percentApprox*100);
			fflush(stdout);
		}
   	}
	
	if (extract(&myHead) == -1)
	{
		printf("\r  - All clones have been dispatched: 100.00%% complete.                      \n");

		printf("  - All threads told to terminate.                          \n");
	}
	else
	{
		printf("\r  - All our workers died, needless to say the job failed.\n");
		error = STATUS_FAIL_ALGORITHM;
	}

	pthread_mutex_unlock(&workerWaitingCondMutex);

	/* Finish with statistics */
#ifndef NO_DAEMON_LOG
	closeStats();
#endif

	return error;
}

/* The dispatcher is what gives out orders (clone #s) to the worker threads.
 * This dispatcher retrieves them from the network and keeps a circular
 * buffer so that we don't wait on the network.
 *
 * If you get a large number of cache missing in your log files for a job, you
 * might try increasing the BUFFER_FACTOR or decreasing worker thread count 
 * on the fpc client.
 *
 * lock is a mutex used to protect the socket 'sock'. We lock before we
 * read or write to it.
 */
/* FIXME: use the new queueing strategy from recording clones to process */
int slaveDispatcherProcedure(int threadCount, int sock, pthread_mutex_t* lock)
{
	int* orders;
	int  ordersSize;
	int  orderIndex;
	int  orderRemain;
	int  requestsPending;
	int  working;
	int  error;
	int  cacheMisses;
	int  threadsAlive;

	struct sigaction old_sigpipe;
	struct sigaction ping_signal;

	/* This determines how much of an order buffer this node keeps to combat
	 * network latency. A higher number for slower connection.
	 */
	const int BUFFER_FACTOR = 3;
 
	sigset_t set;

	/* We are supposed to receive signals:
	 *  SIGPIPE & SIGALRM & SIGTERM & SIGINT 
	 */
	sigemptyset(&set);
	sigaddset(&set, SIGALRM);
	sigaddset(&set, SIGPIPE);
	sigaddset(&set, SIGTERM);
	sigaddset(&set, SIGINT);
	pthread_sigmask(SIG_UNBLOCK, &set, 0);
	
	threadsAlive = threadCount;
	cacheMisses = 0;
	error = STATUS_OK;

#ifndef NO_DAEMON_LOG
	initStats();
#endif
	
	/* Once we have this lock, no signals about new workers waiting will
	 * slip by us.
	 */
	pthread_mutex_lock(&workerWaitingCondMutex);
	
	/* Let people ask for orders now */
	pthread_mutex_unlock(&acquireOrdersMutex);

	/* Obtain a buffer that is threadCount * buffer factor */
	ordersSize = threadCount * BUFFER_FACTOR;
	orders = (int*)malloc(ordersSize * sizeof(int));

	if (!orders)
	{
		mergeThreads(threadsAlive);
		pthread_mutex_lock(&acquireOrdersMutex);
		pthread_mutex_unlock(&workerWaitingCondMutex);
		return STATUS_FAIL_ALGORITHM;
    }
	
	orderRemain = 0;		/* We presently have no orders in the buffer */
	requestsPending = 0;	/* We have not requested any more orders */
	orderIndex = 0;         /* We start at the start :-) */
	working = 0;            /* We haven't started working yet */

	/* Setup a handler for our alarm/sigpipe that triggers an EIO */
	ping_signal.sa_handler = &pingHandler;
	sigemptyset(&ping_signal.sa_mask);
	ping_signal.sa_flags = 0; /* note: no SA_RESTART */
 
	/* Register this handler on sig_pipe */
   	sigaction(SIGPIPE, &ping_signal, &old_sigpipe);
	
	/* Send out some requests (BUFFER_FACTOR) many */
	for (requestsPending = 0; requestsPending < BUFFER_FACTOR; requestsPending++)
	{
		u_int32_t request[2];
		request[0] = htonl(DAEMON_WANTS_CLONES);
		request[1] = htonl(threadCount);
		pthread_mutex_lock(lock);
		if (write(sock, &request[0], sizeof(u_int32_t)*2) != sizeof(u_int32_t)*2)
	    {
			pthread_mutex_unlock(lock);
			error = STATUS_FAIL_NETWORK;
			requestsPending = 0;
			break;
		}
		pthread_mutex_unlock(lock);
	}
	
	/* This loop allows us to dispatch orders to the workers, request
	 * orders from the client, and receive orders from the client without
	 * blocking. The only block cases is:
	 *	Reponse fragmentation - we block to read the whole response
	 *
	 * This function should use minimal (read: almost zero) CPU time
	 * for normal operation (when workers are working). When we exhaust
	 * our buffer, an error is generated and we block 1/10th second (bad!).
	 * This delay is to prevent us from sucking all CPU time endlessly
	 * testing for a response, but it is a major performance hit.
	 * Increase BUFFER_FACTOR to the point where this doesn't happen.
	 * Making BUFFER_FACTOR too high however will make this system the
	 * bottleneck on the processing during the final calculations (bad).
	 */
	while (orderRemain || requestsPending)
	{
		if (requestsPending)
		{
			u_int32_t reply;
			u_int32_t count;
			int i;
			
			/* Check for a reply waiting - non blocking read */
			pthread_mutex_lock(lock);
			i = read(sock, &reply, sizeof(reply));
			reply = ntohl(reply);
			
			/* Uh oh! The ports gone bad - bail out */
			if (i != sizeof(reply) && errno != EAGAIN)
			{
				syslog(LOG_WARNING, "client quit with requests pending - restarting\n");
				error = STATUS_FAIL_NETWORK;
				pthread_mutex_unlock(lock);
				break;
			}
			
			if (i == sizeof(reply) && reply == DAEMON_HERE_ARE_CLONES)
			{
				working = 1; /* We are now working - any underflows henceforth are bad */
				
				/* Reset the non-block flag - we want to block */
				fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) & ~O_NONBLOCK);

				if (readLoop(sock, &count, sizeof(count)) != sizeof(count))
				{
					syslog(LOG_WARNING, "client quit mid clone packet - restarting\n");
					error = STATUS_FAIL_NETWORK;
					break;
				}
				count = ntohl(count);

				//printf("gupta count = %d\norders ", count);

				/* We want to block. Note that we only block
				 * once data is known to be ready - we still
				 * run happily in our buffer otherwise
				 */
				for (i = 0; i < count; i++)
				{
					u_int32_t order;

					if (readLoop(sock, &order, sizeof(order)) != sizeof(order))
				    {
						syslog(LOG_WARNING, "client quit mid clone packet - restarting\n");
						error = STATUS_FAIL_NETWORK;
						break;
					}
					//printf(" %d ",ntohl(order)); 

					orders[(orderIndex + orderRemain + i) % ordersSize] = ntohl(order);
				}
				
				if (i != count)
			    {
				    fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | O_NONBLOCK);
					pthread_mutex_unlock(lock);
					break; /* propogate bad socket break up */
				}

				orderRemain += i;
				
				/* Put us back in non-blocking mode */
				fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | O_NONBLOCK);
				
				requestsPending--;
			}
			
			pthread_mutex_unlock(lock);
		}

		if (orderRemain)
		{
			/* Wait till someone asks for orders */
			pthread_cond_wait(&workerWaitingCond, &workerWaitingCondMutex);

			/* If last status was bad, we want to kill worker */
			if (previousStatus != STATUS_OK)
		    {
			  ordersValue = -1;
			  threadsAlive--;
			  error = previousStatus;
			}
			else
		    {
			  /* Dispatch an order */
			  ordersValue = orders[orderIndex];
			}
		
			/* Tell the worker its orders are ready */
			pthread_mutex_lock(&ordersCondMutex);
			pthread_cond_signal(&ordersCond);
			pthread_mutex_unlock(&ordersCondMutex);

			/* Last status was bad, bail out of dispatcher procedure */
			if (previousStatus != STATUS_OK) break;
			
			/* Now, increase the index and possibly request more 
			 * orders if we cross a thread count boundary.
			 */
			orderIndex++;
			orderRemain--;
			
			if (orderIndex % threadCount == 0)
			{
				u_int32_t request[2];
				
				/* If we are divisible by threadCount, maybe also div
				 * by size of buffer, so wrap it.
				 */
				orderIndex %= ordersSize;
				
				/* We crossed a boundary, request more orders */
				request[0] = htonl(DAEMON_WANTS_CLONES);
				request[1] = htonl(threadCount);

				pthread_mutex_lock(lock);
				fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) & ~O_NONBLOCK);

				if (write(sock, &request[0], sizeof(u_int32_t)*2)
					!= sizeof(u_int32_t)*2)
				{
				    syslog(LOG_WARNING, "client quit before clone request - restarting\n");
				    error = STATUS_FAIL_NETWORK;
					fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | O_NONBLOCK);
					pthread_mutex_unlock(lock);
				    break;
				}

				fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | O_NONBLOCK);
				pthread_mutex_unlock(lock);
				
				requestsPending++;
			}
		}
		else
		{
		    u_int32_t ping;
			
			/* We're out of orders, have pending requests and are working. */
			if (working && requestsPending)
			{
				cacheMisses++;
				/* syslog(LOG_NOTICE, "worker buffer exhausted\n"); */
			}
			
			/* Send a ping to the client to force an ACK */
			ping = htonl(DAEMON_KEEP_ALIVE_PING);
			pthread_mutex_lock(lock);
		  	fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) & ~O_NONBLOCK);

		   	if (write(sock, &ping, sizeof(u_int32_t)) != sizeof(u_int32_t))
			{
				syslog(LOG_WARNING, "client quit during daemon buffer exhaustion - restarting\n");
				error = STATUS_FAIL_NETWORK;
				fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | O_NONBLOCK);
				pthread_mutex_unlock(lock);
				break;
			}
				
		   	fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | O_NONBLOCK);
		   	pthread_mutex_unlock(lock);
			
			usleep(100000); /* 100ms */
		}
	}
	
	mergeThreads(threadsAlive);

	free(orders);
	pthread_mutex_lock(&acquireOrdersMutex);
	pthread_mutex_unlock(&workerWaitingCondMutex);

	sigaction(SIGPIPE, &old_sigpipe, 0);

#ifndef NO_DAEMON_LOG
	closeStats();
#endif

	if (error) return error;
	return cacheMisses;
}

static void daemonCleanupLinkHelper(int* connectionCount, 
									struct daemonPusherArg* d,
									int* queueHeads,
									int i)
{
    int order;

    /* Kill off the connection socket */
    if (close(d->daemons[i]) == -1) 
	    perror("closing daemon link");

	/* Push all the jobs back onto the dispatcher's queue */
	while ((order = extract(&queueHeads[i])) != -1)
	{
	    printf("\rClone %i has been restored to the queue\n", order);
	 	pushBackOrders(order);
	}
	
	(*connectionCount)--;
	d->daemons    [i] = d->daemons    [*connectionCount];
	d->daemon_sins[i] = d->daemon_sins[*connectionCount];
	queueHeads    [i] = queueHeads    [*connectionCount];
}

/* This is the thread which talks to the fpcd server daemons.
 * It gets clones from the dispatcher running in this process and
 * ships them out over TCP/IP to the fpcd server daemons.
 */
void* daemonPusherProcedure(void* arg)

{
    struct daemonPusherArg d = *(struct daemonPusherArg*)arg;
	int i, j, k;         /* loop variables */
	fd_set readfds;      /* socket set for select */
	int connectionCount; /* Number of daemon links */
	int highest;         /* for select */
	int done;            /* boolean. true once all clones dispatched */
	int error;           /* accumulated error status */
    int get;             /* number of bytes gotten during a read */

	/* Used when reading daemon requests */
	u_int32_t  numClones;
	u_int32_t  request;

	/* These two are used when reading daemon completion info */
	u_int32_t  cloneNumber;
	u_int32_t  nodeCount;

	/* This is an array of queue heads for the daemons */
	int* queueHeads;

	/* These are used during sending clones */
	u_int32_t header[2];
	u_int32_t order;
	int       goodSoFar;

	struct sigaction ping_signal;
	struct sigaction old_sigpipe;
	struct sigaction old_alarm;
	
	sigset_t set;
	/* We are supposed to allow signals SIGPIPE & SIGALRM */
	sigemptyset(&set);
	sigaddset(&set, SIGALRM);
	sigaddset(&set, SIGPIPE);
	pthread_sigmask(SIG_UNBLOCK, &set, 0);
	/* We are supposed to ignore signals SIGTERM & SIGINT */
	sigemptyset(&set);
	sigaddset(&set, SIGINT);
	sigaddset(&set, SIGTERM);
	pthread_sigmask(SIG_BLOCK, &set, 0);

	connectionCount = d.daemonCount;
	queueHeads = (int*)malloc(sizeof(int) * connectionCount);
	if (!queueHeads) return (void*)STATUS_FAIL_ALGORITHM;

	/* Setup a handler for our sigpipe that triggers an EIO */
	ping_signal.sa_handler = &pingHandler;
	sigemptyset(&ping_signal.sa_mask);
	ping_signal.sa_flags = 0; /* note: no SA_RESTART */
   	sigaction(SIGPIPE, &ping_signal, &old_sigpipe);

	/* Setup a handler for the alarm */
	sigaction(SIGALRM, &ping_signal, &old_alarm);

	highest = 0;
	for (i = 0; i < connectionCount; i++)
	{
		/* Record highest handle + 1 */
		if (d.daemons[i] >= highest) highest = d.daemons[i] + 1;
		/* Set the queues initially empty */
		queueHeads[i] = -1;
	}
	
	done = 0;
	error = STATUS_OK;
	
	needPing = 0;
	alarm(5); /* schedual a ping in 5 seconds */
	while (connectionCount)
	{
	    if (needPing)
	    {  
			for (i = 0; i < connectionCount; i++)
		    {
			    request = htonl(DAEMON_KEEP_ALIVE_PING);
				if (write(d.daemons[i], &request, sizeof(request))
					!= sizeof(request))
			    {	
			        printf("\n  - Ping reveals dead daemon: %s - Recovered.\n",
						   addressToName(d.daemon_sins[i]));
					
					/* OBSOLETE: recovery of clones makes this unneeded */
					/* error = STATUS_FAIL_NETWORK; */

					daemonCleanupLinkHelper(&connectionCount, &d, 
											queueHeads, i);
					
					i--; /* balance the i++ of the loop */
				}
			}

			needPing = 0;
			alarm(5); /* ping again in 5 seconds */

			continue;
		}
	    alarm(0); /* JHAT - turn off the alarm */
	    //alarm(5);

		FD_ZERO(&readfds);
		for (i = 0; i < connectionCount; i++)
			FD_SET(d.daemons[i], &readfds);
		
		select(highest, &readfds, 0, 0, 0);
		
		for (i = 0; i < connectionCount; i++)
		{
			get = read(d.daemons[i], &request, sizeof(request));
			
			if (get == -1 && errno != EAGAIN)
			{
			    printf("\n  - Node quit without handshake: %s - Recovered.\n",
					   addressToName(d.daemon_sins[i]));

				/* OBSOLETE: We reclaim their clones - no problem. */
				/* error = STATUS_FAIL_NETWORK; */

				daemonCleanupLinkHelper(&connectionCount, &d, 
										queueHeads, i);

				break;
		    }

			if (get == sizeof(request))
			{ /* Go blocking to deal with request */
				fcntl(d.daemons[i], F_SETFL, 
					  fcntl(d.daemons[i], F_GETFL, 0) & ~O_NONBLOCK);
				
				request = ntohl(request);
				
				switch (request)
				{
				    case DAEMON_KEEP_ALIVE_PING:
		  			    break;

					case DAEMON_DONE_CLONE:
					    if (read(d.daemons[i], &cloneNumber,
								 sizeof(cloneNumber)) == sizeof(cloneNumber)) 
						{
							cloneNumber = ntohl(cloneNumber);
							
							//printf("\nDone clones = %d  nodecount =  ",cloneNumber);
							
							if (read(d.daemons[i], &nodeCount, 
									 sizeof(nodeCount)) == sizeof(nodeCount)) 
							{
								nodeCount = ntohl(nodeCount);
								
								//printf(" %d ",nodeCount);
								
								Zcreate_node_from_socket(d.daemons[i], 
												   cloneNumber, nodeCount);
								purge(&queueHeads[i], cloneNumber);
								break;
							}	
						}

					    //printf("\n");

					  /* Fall through if the done clone data is missing */
					case DAEMON_REPORTING_ERROR:
  					    printf("\n  - Calculation error on: %s - NOT RECOVERABLE\n",
							   addressToName(d.daemon_sins[i]));
						error = STATUS_FAIL_ALGORITHM;
						daemonCleanupLinkHelper(&connectionCount, &d, 
										queueHeads, i);

						break;

				   
				    case DAEMON_WANTS_TO_QUIT:
						if (!done)
						{
							printf("\n  - Premature exit by %s - Recovered.\n",
								   addressToName(d.daemon_sins[i]));
						}

						daemonCleanupLinkHelper(&connectionCount, &d, 
										queueHeads, i);

						break;

					case DAEMON_WANTS_CLONES:
						read(d.daemons[i], &numClones, sizeof(numClones));
						numClones = ntohl(numClones);

						//printf("Clones requested = %d by %d\n",numClones, d.daemons[i]);

						if (!done)
						{
							for (j = 2; j < numClones + 2; j++)
							{
							    order = acquireWorkerOrders(error);
								
								/* Decrement counter b/c remote jobs 
								 * don't count 
								 */
								pthread_mutex_lock(&acquireOrdersMutex);
								jobCount--;
								calcCount -= ZZ.size - order;
								pthread_mutex_unlock(&acquireOrdersMutex);

								if (order == -1)
								{
									done = 1;
									break;
								}
								
								enqueue(&queueHeads[i], order);
							}
							header[0] = htonl(DAEMON_HERE_ARE_CLONES);
							header[1] = htonl(j - 2);
						}
						else
						{
						    header[0] = htonl(DAEMON_HERE_ARE_CLONES);
							header[1] = htonl(0);
							j = 2;
						}
						
						goodSoFar = write(d.daemons[i], header, 
										  sizeof(u_int32_t) * 2)
						             == sizeof(u_int32_t) * 2;

						for (k = 0, order = queueHeads[i]; 
							 k < j - 2; 
							 k++, order = ZZ.matrix[order].nextQueued)
						{
							if (!goodSoFar) break;
							order = htonl(order);
							goodSoFar = write(d.daemons[i], &order,
											  sizeof(u_int32_t))
							             == sizeof(u_int32_t);
							order = ntohl(order);
						}
				        
						if (!goodSoFar)
						{
						    printf("\n  - Daemon death on send: %s - Recovered.\n",
								   addressToName(d.daemon_sins[i]));
							
							daemonCleanupLinkHelper(&connectionCount, &d, 
													queueHeads, i);
						}
						else
						{
							fcntl(d.daemons[i], F_SETFL, 
								  fcntl(d.daemons[i], F_GETFL, 0) | 
								  O_NONBLOCK);
						}
						
						break;
						
					default:
						fprintf(stderr, 
								"Unable to service unknown request %u from daemon.\n", 
								request);
				}
				
				/* Return to non-blocking mode */
				fcntl(d.daemons[i], F_SETFL, fcntl(d.daemons[i], F_GETFL, 0) | O_NONBLOCK);
				/* We've serviced the request */
				break;
			}
		}
	}
	
	free(queueHeads);
   	sigaction(SIGPIPE, &old_sigpipe, 0);
	sigaction(SIGALRM, &old_alarm,   0);
	
	return (void*)error;
}

#ifndef NO_DAEMON_LOG
void closeJob(int sig)
{
	FILE*  logFile;
	char   timeBuf[100];
	time_t now;

	/* Write job completion to disk */
	if ((logFile = fopen("/var/log/fpcd/jobs.log", "a")) != (FILE*)0)
	{
		/* flock(fileno(logFile), LOCK_EX); */
		lockf(fileno(logFile), F_LOCK, 0);
		now = time(0);
		strftime(timeBuf, sizeof(timeBuf), "%Y/%m/%d-%H:%M:%S", localtime(&now));
		fprintf(logFile, "%li %s STOP_JOB %s %i\n", 
				now, timeBuf, getpwuid(getuid())->pw_name, getpid());
		/* flock(fileno(logFile), LOCK_UN);*/
		lockf(fileno(logFile), F_ULOCK, 0);
		fclose(logFile);
	}

	if (sig != -1) exit(1);
}
#endif

/* This will build the matrix for fpc in a distributed manner.
 * This is the main entry point into the distributed code.
 */
int parallel_node_creation()
{
	pthread_t* workers;
	pthread_t  daemonPusherThread;
	int worker;
	void* value;
	int error;
	int tmp;
	struct daemonPusherArg daemons;
	
	int numberThreads = 0; /* sysconf(_SC_NPROCESSORS_CONF); */
	int queueHead;

	FILE*  logFile;
	char   timeBuf[100];
	time_t now;

#ifndef NO_DAEMON_LOG
	signal(SIGINT,  &closeJob);
	signal(SIGTERM, &closeJob);
	/* Write job startup to disk */
	if ((logFile = fopen("/var/log/fpcd/jobs.log", "a")) != (FILE*)0)
	{
		/* flock(fileno(logFile), LOCK_EX); */
		lockf(fileno(logFile), F_LOCK, 0);
		now = time(0);
		strftime(timeBuf, sizeof(timeBuf), "%Y/%m/%d-%H:%M:%S", localtime(&now));
		fprintf(logFile, "%li %s START_JOB %s %i %i\n", 
				now, timeBuf, getpwuid(getuid())->pw_name, getpid(), numberThreads);
		/* flock(fileno(logFile), LOCK_UN); */
		lockf(fileno(logFile), F_ULOCK, 0);
		fclose(logFile);
	}
#endif

    /* Build the cache table before we start so the threads don't thrash it */
    rebuildLoadCzFastCache();

	error = STATUS_OK;
  
	printf("\n");
	printf("Entered parallel_node_creation:\n");
	printf("  - Preparing memory and environment: ");
	fflush(stdout);
	
	if (!(workers = (pthread_t*)malloc(sizeof(pthread_t)*numberThreads)))
	{
		return -1;
	}
	
	/* Initialize mutexes and conditions */
	initMutexes();	
	printf("done\n");
	
	/* Look for client processors */
	daemons.daemonCount = findDaemons(&daemons.daemons, &daemons.daemon_sins);
	
	/* Quit on error */
	if (daemons.daemonCount == -1) return STATUS_FAIL_ALGORITHM;
	
	printf("  - Running serial matrix prepass: ");
	fflush(stdout);
	if (Zcreate_node_prepare(&queueHead) == -1) return STATUS_FAIL_ALGORITHM;
	printf("done\n");
	
	if (daemons.daemonCount > 0)
	{
		printf("  - Launching the daemonPusher\n");
		pthread_create(&daemonPusherThread, 0, daemonPusherProcedure, &daemons);	
		daemonPusherRunning = 1;	
	}
	else
	{
		daemonPusherRunning = 0;
	}
	
	printf("  - Creating %i threads: ", numberThreads);
	fflush(stdout);
	for (worker = 0; worker < numberThreads; worker++)
	{
		if (pthread_create(&workers[worker], 0, localWorkerProcedure, 0) < 0)
		{
			printf("CRITICAL ERROR: cannot create threads!\n");
			return STATUS_FAIL_ALGORITHM;
		}
	}
	
	printf("done\n");
	
	printf("  - Launching the dispatcher\n");
	tmp = localDispatcherProcedure(numberThreads, queueHead);
	if (tmp != STATUS_OK) error = tmp;
	
	printf("  - Dispatcher has completed assigning tasks.\n");
	printf("  - Joining threads: ");
	fflush(stdout);
	
	for (worker = 0; worker < numberThreads; worker++)
	{
		pthread_join(workers[worker], &value);
		if ((int)value < 0) error = (int)value;
	}

	if (daemonPusherRunning)
	{

	  	pthread_join(daemonPusherThread, &value);	

		if ((int)value < 0) error = (int)value;
		free(daemons.daemons);
	}
	
	printf("done\n");
	
	/* Destroy mutexes and conditions */
	destroyMutexes();
	
	free(workers);
	
	printf("  - Running serial node finishing routine: ");
	Zcreate_node_finish();
	printf("done\n");
	
	printf("  - Parallel sparse matrix build %s.\n",
		error!=STATUS_OK?"FAILED":"complete");

	printf("sness-In parallel node creation - end\n");

#ifndef NO_DAEMON_LOG
	closeJob(-1);
#endif
	printf("sness-In parallel node creation - end1\n");
	signal(SIGINT, SIG_DFL);
	signal(SIGTERM, SIG_DFL);
	printf("sness-In parallel node creation - end2\n");

	return error;
}

